昨日已先行提過Flux,可方便處理一連串指定類型事件,所以說Flux就像瑞凡,被眾人討厭的瑞凡,散播出去,就難收回了,所以我們說,瑞凡,回不去了,而Mono用於處理單一指定類型事件,可說是一種單一(Single)架構,其中也包含了許多與Flux相似的靜態方法,如:just、empty、error及never等用法,所以只要出錯位置都可以確切地發現,而Mono有許多針對自身元件設計的特別方法,較常被開發者使用的有fromCallable、fromCompletionStage、fromFuture、fromRunnable和fromSupplier方法,這五種方法分別從Callable、CompletionStage、CompletableFuture、Runnable和Supplier中創建了Mono對象,我是小編威斯丁,我將以fromCallable作原理與範例介紹。
Mono架構是處理0到1個異步發射器,他是採用一種非阻塞式的架構,可以確保每個指定類型事件可以運算完畢,不受其他執行緒的影響,Mono是一個專一的Publisher,也就是說最多只能發射出一個類別,可在運作元subscribe任務後,以onComplete方法或onError方法结束。可用於操作Flux子集,可將Mono與另一個發布者的操作與做組合,並切換到Flux資料序列,如Mono.concatWith(Publisher)進行返回一個Flux ,而Mono.then(Mono)則返回一個Mono,亦可用於表示只有執行概念且無返回物件值地執行緒。如:Mono,我們可參照以下流程圖,可得知粗黑垂直線為onComplete方法,紅色X代表onError方法,僅支援一次性發射與運作性任務,提供給各位開發者作參考。
圖一、Mono 物件運作流程圖
小編延續先前範例,將台灣區及中國區建立產品的程式碼區段,改為Mono單一執行緒處理方式,確版建立過程不受其他程序影響,可看出在成功時(onComplete)會寫出建立成功的log,錯誤時(onError)會噴出錯誤例外及寫錯誤Log。
// Taiwan create product flow
@Override
public SeaFood createSeaFood(SeaFood seaFood) throws SeaFoodRetailerGenericException {
validateNullId(seaFood);
Mono.fromCallable(() ->seaFood).subscribe(
new Consumer<SeaFood>() {
@Override
public void accept(SeaFood seaFood1) {
SEA_FOOD_CACHE_TAIWAN.asMap().putIfAbsent(seaFood.getId(),seaFood);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
logger.error("Taiwan product create fail. ex:{}",throwable.getMessage());
throw new SeaFoodRetailerGenericException("Out of EXPECT error.");
} catch (SeaFoodRetailerGenericException e) {
e.printStackTrace();
}
}
},
()->logger.info("Create Taiwan product success ! "));
return seaFood;
}
//china create product flow
@Override
public SeaFood createSeaFood(SeaFood seaFood) throws SeaFoodRetailerGenericException {
validateNullId(seaFood);
validateNullId(seaFood);
Mono.fromCallable(() ->seaFood).subscribe(
new Consumer<SeaFood>() {
@Override
public void accept(SeaFood seaFood1) {
SEA_FOOD_CACHE_CHINESE.asMap().putIfAbsent(seaFood.getId(),seaFood);
}
},
new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) {
try {
logger.error("China product create fail. ex:{}",throwable.getMessage());
throw new SeaFoodRetailerGenericException("Out of EXPECT error.");
} catch (SeaFoodRetailerGenericException e) {
e.printStackTrace();
}
}
},
()->logger.info("Create China product success ! "));
return seaFood;
}
透過Postman發射Create all Product測試結果,可看到以下成功的log,可看出我們一個session發射出兩個執行緒(Thread)。
23:14:08.476 INFO 21055 --- [nio-8080-exec-2] o.s.web.servlet.DispatcherServlet : Completed initialization in 10 ms
23:14:08.651 INFO 21055 --- [nio-8080-exec-2] s.s.s.s.SeaFoodRetailerServiceImpl : Create Taiwan product success !
23:14:08.654 INFO 21055 --- [nio-8080-exec-2] s.s.s.s.ChinaSeaFoodRetailerServiceImpl : Create China product success !
Subscribe model : {"id":"C-0099","name":"A-cha Crab","description":"Opilio is the primary species referred to as A-cha crab."}
小編透過以上流程與說明提供給各位開發者作參考,架構各位可參考昨日所帶出的Flux資料流架構,都是共通繼承CoreSubscriber介面,僅差別在Mono而外拉出專用的LambdaMonoSubscriber介面,並再客製其特有的方法,其所有處理的靜態方法都是透過onAssembly進行轉換後回傳新Mono對象,為一種方法鏈(Method Chain)寫法。
Spring Reactive Stack(一)響應式程式設計中Mono和Flux